Spark Operations examples¶
Main Spark functionality
# -*- coding: utf-8 -*-
import os
os.chdir("/home/cloudops/spark")
os.curdir
#============================
# Loading Data From Files
#============================
# Lazy initialization!!!
autoData = sc.textFile("data/auto-data.csv")
autoData.cache()
# Loads only now
autoData.count()
autoData.first()
autoData.take(5)
for line in autoData.collect():
print(line)
#============================
# Loading Data From a Collection
#============================
collData = sc.parallelize([3,5,4,7,4])
collData.cache()
collData.count()
#============================
# Transformations
#============================
# Map (and create a new RDD)
tsvData = autoData.map(lambda x : x.replace(",","\t"))
tsvData.take(5)
# Filter (and create a new RDD)
toyotaData = autoData.filter(lambda x: "toyota" in x)
toyotaData.count()
# FlatMap (and create a new RDD)
words = toyotaData.flatMap(lambda line: line.split(","))
words.take(20)
# Distinct (collect() = execute)
for numbData in collData.distinct().collect():
print(numbData, end = ', ') # 3, 5, 4, 7
# Set operations
words1 = sc.parallelize(["hello","war","peace","world"])
words2 = sc.parallelize(["war","peace","universe"])
# Union
for word in words1.union(words2).distinct().collect():
print(word, end = ", ") # peace, world, universe, hello, war,
# Intersection
for word in words1.intersection(words2).collect():
print(word, end = ", ") # peace, war,
#============================
# Actions
#============================
# Reduce
collData.reduce(lambda x, y: x + y) # 23
# Find the SHORTEST LINE
autoData.reduce(lambda x,y: x if len(x) < len(y) else y)
# 'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'
# Aggregations
# =============
# Perform the same work as reduce
seqOp = (lambda x, y: (x+y))
combOp = (lambda x, y: (x+y))
collData.aggregate((0), seqOp, combOp) # 23
# Do addition and multiplication at the same time
# X now becomes a tuple for sequence
seqOp = (lambda x, y: (x[0]+y, x[1]*y))
# both X and Y are tuples
combOp = (lambda x, y: (x[0]+y[0], x[1]*y[1]))
# initlal values: x[0] = 0, x[1] = 1
collData.aggregate((0,1), seqOp, combOp) # (23, 1680)
#============================
# Functions in Spark
#============================
# Cleanse and transform an RDD
def cleanseRDD(autoStr) :
if isinstance(autoStr, int) :
return autoStr
attList=autoStr.split(",")
# convert doors to a number
if attList[3] == "two" :
attList[3]="2"
else :
attList[3]="4"
# Convert Drive to uppercase
attList[5] = attList[5].upper()
return ",".join(attList)
cleanedData = autoData.map(cleanseRDD)
cleanedData.collect()
# ['MAKE,FUELTYPE,ASPIRE,4,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
# 'subaru,gas,std,2,hatchback,FWD,four,69,4900,31,36,5118',
# 'chevrolet,gas,std,2,hatchback,FWD,three,48,5100,47,53,5151',
# 'mazda,gas,std,2,hatchback,FWD,four,68,5000,30,31,5195',
# . . .
# Issue a function to perform reduce
def getMPG( autoStr) :
# get the previois calculated value
if isinstance(autoStr, int) :
return autoStr
# value from new line
attList = autoStr.split(",")
if attList[9].isdigit() :
return int(attList[9])
else:
return 0
# Find average MPG-City for ALL cars
autoData.reduce(lambda x,y : getMPG(x) + getMPG(y)) \
/ (autoData.count()-1)
# 25.15228426395939
#============================
# Working with Key/Value RDDs
#============================
# Create a K:V RDD of auto Brand and Horsepower
cylData = autoData.map( lambda x: \
( x.split(",")[0], x.split(",")[7]))
cylData.take(3)
# [('MAKE', 'HP'), ('subaru', '69'), ('chevrolet', '48')]
cylData.keys().collect() # repeated keys
cylData.keys().count() # 198
# Remove header row - new RDD
header = cylData.first()
cylHPData= cylData.filter(lambda line: line != header)
# cylHPData.collect()
# Add a count 1 to each record and then reduce
# to find Totals of HP and Counts
brandValues=cylHPData.mapValues(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), \
x[1] + y[1]))
brandValues.collect()
# [('subaru', (1035, 12)),
# ('chevrolet', (188, 3)),
# ('mazda', (1390, 16)),
# . . .
# ('porsche', (764, 4)),
# ('mercedes-benz', (1170, 8)),
# ('jaguar', (614, 3))]
# Find Average by dividing HP total by count total
brandValues.mapValues(lambda x: int(x[0])/int(x[1])).collect()
# [('subaru', 86.25),
# ('chevrolet', 62.666666666666664),
# ('mazda', 86.875),
# . . .
# ('porsche', 191.0),
# ('mercedes-benz', 146.25),
# ('jaguar', 204.66666666666666)]
#============================
# Advanced Spark : Accumulators & Broadcast Variables
#============================
# Function that splits the line as well as counts sedans
# and hatchbacks
# Speed optimization
# ==================
# Initialize accumulators (global)
sedanCount = sc.accumulator(0)
hatchbackCount = sc.accumulator(0)
# Set BROADCAST variable (broadcast)
sedanText = sc.broadcast("sedan")
hatchbackText = sc.broadcast("hatchback")
def splitLines(line) :
global sedanCount
global hatchbackCount
# Use broadcast variable to do comparison
# and set accumulator
if sedanText.value in line:
sedanCount += 1
if hatchbackText.value in line:
hatchbackCount += 1
return line.split(",")
# Do the map
splitData = autoData.map(splitLines)
# Make it execute the map (lazy execution)
splitData.count()
print(sedanCount, hatchbackCount) # 92 67
#============================
# Advanced Spark : Partitions
#============================
collData.getNumPartitions() # 1
# Specify no. of partitions
collData = sc.parallelize([3,5,4,7,4], 2)
collData.cache()
collData.count() # 5
collData.getNumPartitions() # 2
# localhost:4040 shows the current spark instance